feat: parallel Comparison.from_netcdf and .expand (not planned)#672
feat: parallel Comparison.from_netcdf and .expand (not planned)#672
Conversation
…workers Both methods accept `max_workers` and use a thread pool. `from_netcdf` serializes the actual netCDF4 read under a module-level lock (the C library is not thread-safe) and parallelizes the FlowSystem deserialization; `expand` parallelizes per-system expansion and passes through systems without clustering so mixed comparisons work. Benchmarked speedup is modest (~1.2-1.3x for loads, ~1x for expand); thread overhead largely cancels out the small fraction of work that actually releases the GIL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThe changes add two new methods to the Changes
Sequence DiagramssequenceDiagram
participant Client
participant ThreadPool
participant Lock
participant FileSystem as File System
participant FlowSystem
participant Comparison
Client->>ThreadPool: from_netcdf(paths, max_workers)
ThreadPool->>ThreadPool: Create worker tasks for each path
loop For each NetCDF file
ThreadPool->>Lock: Acquire _NETCDF_READ_LOCK
Lock-->>ThreadPool: Lock acquired
ThreadPool->>FileSystem: load_dataset_from_netcdf(path)
FileSystem-->>ThreadPool: Dataset
ThreadPool->>Lock: Release _NETCDF_READ_LOCK
ThreadPool->>FlowSystem: FlowSystem.from_dataset(dataset)
FlowSystem-->>ThreadPool: FlowSystem instance
end
ThreadPool-->>Comparison: All FlowSystem objects
Comparison-->>Client: Comparison with case names
sequenceDiagram
participant Client
participant ThreadPool
participant FlowSystem
participant Transform
participant Comparison
Client->>ThreadPool: expand(max_workers)
ThreadPool->>ThreadPool: Create tasks for each FlowSystem
loop For each FlowSystem
alt FlowSystem.clustering is not None
ThreadPool->>Transform: fs.transform.expand()
Transform-->>ThreadPool: Expanded FlowSystem
else
ThreadPool-->>ThreadPool: Pass through unchanged
end
end
ThreadPool-->>Comparison: Expanded systems (names preserved)
Comparison-->>Client: New Comparison instance
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
flixopt/comparison.py (1)
34-38: Consider moving the netCDF lock intoload_dataset_from_netcdfitself.Putting the lock inside
flixopt/io.py::load_dataset_from_netcdfwould protect all concurrent callers, not justComparison.from_netcdf. As written, any other thread-pool consumer ofload_dataset_from_netcdf(now or in the future) can still race withfrom_netcdfreaders and segfault the shared HDF5/netCDF4 global state, because each call site must remember to take_NETCDF_READ_LOCKitself. Centralizing the lock next to the actualxr.load_dataset(..., engine='netcdf4')call makes the thread-safety guarantee a property of the loader rather than of each caller.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@flixopt/comparison.py` around lines 34 - 38, The module-level _NETCDF_READ_LOCK should be moved into flixopt/io.py and used inside load_dataset_from_netcdf so the xr.load_dataset(..., engine='netcdf4') call is wrapped by the lock; update load_dataset_from_netcdf to create or reference a threading.Lock (e.g., a module-scoped lock in flixopt/io.py) and acquire/release it around the actual netCDF4 read, and remove external locking in Comparison.from_netcdf so callers no longer need to remember to take _NETCDF_READ_LOCK; this centralizes thread-safety at the loader (load_dataset_from_netcdf) rather than at each caller.tests/test_comparison.py (1)
543-663: Solid coverage for the new parallel paths.The list/dict input forms, serial-vs-parallel equivalence via
xr.testing.assert_identical, full-timestep restoration (168 + 1), and the mixed clustered/non-clustered identity check together cover the meaningful behavioral contracts offrom_netcdfandexpand. Thepytest.importorskip('tsam')guard inclustered_systemskeeps the suite portable.One small note:
test_from_netcdf_serial_matches_parallelcompares thedefaultpath againstmax_workers=1, but on single-core CI runnersThreadPoolExecutor's default may also collapse to 1 worker, so this asserts determinism rather than true serial-vs-parallel equivalence. Consider pinning the "parallel" side tomax_workers=2to make the intent explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_comparison.py` around lines 543 - 663, The test test_from_netcdf_serial_matches_parallel can falsely pass on single-core CI because the ThreadPoolExecutor default may be 1; make the "parallel" call explicitly multi-worker so the comparison is meaningful: in test_from_netcdf_serial_matches_parallel call fx.Comparison.from_netcdf([p1, p2], max_workers=2) for comp_parallel (leave comp_serial as max_workers=1) and keep the subsequent assertions comparing comp_parallel and comp_serial (references: test_from_netcdf_serial_matches_parallel, fx.Comparison.from_netcdf, variables comp_parallel and comp_serial).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@flixopt/comparison.py`:
- Around line 246-251: The helper _load_one currently unconditionally sets
FlowSystem.name to Path(path).stem, overwriting user-supplied names; modify the
loading logic so that when callers supply an explicit name (e.g., via the dict
form feeding Comparison names=), you preserve that name instead of replacing it
— detect the provided label before calling _load_one (or change _load_one to
accept an optional name parameter), call FlowSystem.from_dataset(ds) and only
set fs.name = Path(path).stem if no explicit name was given; update any call
sites that construct Comparison from a mapping to pass the provided label into
_load_one.
---
Nitpick comments:
In `@flixopt/comparison.py`:
- Around line 34-38: The module-level _NETCDF_READ_LOCK should be moved into
flixopt/io.py and used inside load_dataset_from_netcdf so the
xr.load_dataset(..., engine='netcdf4') call is wrapped by the lock; update
load_dataset_from_netcdf to create or reference a threading.Lock (e.g., a
module-scoped lock in flixopt/io.py) and acquire/release it around the actual
netCDF4 read, and remove external locking in Comparison.from_netcdf so callers
no longer need to remember to take _NETCDF_READ_LOCK; this centralizes
thread-safety at the loader (load_dataset_from_netcdf) rather than at each
caller.
In `@tests/test_comparison.py`:
- Around line 543-663: The test test_from_netcdf_serial_matches_parallel can
falsely pass on single-core CI because the ThreadPoolExecutor default may be 1;
make the "parallel" call explicitly multi-worker so the comparison is
meaningful: in test_from_netcdf_serial_matches_parallel call
fx.Comparison.from_netcdf([p1, p2], max_workers=2) for comp_parallel (leave
comp_serial as max_workers=1) and keep the subsequent assertions comparing
comp_parallel and comp_serial (references:
test_from_netcdf_serial_matches_parallel, fx.Comparison.from_netcdf, variables
comp_parallel and comp_serial).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6fe5013d-8ea7-4470-bc0d-50c5d5cd7d5c
📒 Files selected for processing (2)
flixopt/comparison.pytests/test_comparison.py
| def _load_one(path: str | _pl.Path) -> FlowSystem: | ||
| with _NETCDF_READ_LOCK: | ||
| ds = load_dataset_from_netcdf(path) | ||
| fs = FlowSystem.from_dataset(ds) | ||
| fs.name = _pl.Path(path).stem | ||
| return fs |
There was a problem hiding this comment.
fs.name is overwritten with the filename stem even when the user supplies explicit names via a dict.
For the dict input form ({path: 'baseline', ...}), the explicit name is only applied at the Comparison level (via names=), while each FlowSystem.name is still forced to Path(path).stem. That leaves the underlying system with a name that differs from the case label the user asked for, which can be confusing when accessing comp[i].name or later rebuilding a Comparison from those systems.
Consider using the provided name when available:
Proposed tweak
- if isinstance(paths, dict):
- path_list = list(paths.keys())
- names: list[str] | None = list(paths.values())
- else:
- path_list = list(paths)
- names = None
-
- def _load_one(path: str | _pl.Path) -> FlowSystem:
- with _NETCDF_READ_LOCK:
- ds = load_dataset_from_netcdf(path)
- fs = FlowSystem.from_dataset(ds)
- fs.name = _pl.Path(path).stem
- return fs
-
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- flow_systems = list(executor.map(_load_one, path_list))
+ if isinstance(paths, dict):
+ path_list = list(paths.keys())
+ names: list[str] | None = list(paths.values())
+ else:
+ path_list = list(paths)
+ names = None
+
+ stem_names = [_pl.Path(p).stem for p in path_list]
+ fs_names = names if names is not None else stem_names
+
+ def _load_one(item: tuple[str | _pl.Path, str]) -> FlowSystem:
+ path, fs_name = item
+ with _NETCDF_READ_LOCK:
+ ds = load_dataset_from_netcdf(path)
+ fs = FlowSystem.from_dataset(ds)
+ fs.name = fs_name
+ return fs
+
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ flow_systems = list(executor.map(_load_one, zip(path_list, fs_names, strict=True)))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@flixopt/comparison.py` around lines 246 - 251, The helper _load_one currently
unconditionally sets FlowSystem.name to Path(path).stem, overwriting
user-supplied names; modify the loading logic so that when callers supply an
explicit name (e.g., via the dict form feeding Comparison names=), you preserve
that name instead of replacing it — detect the provided label before calling
_load_one (or change _load_one to accept an optional name parameter), call
FlowSystem.from_dataset(ds) and only set fs.name = Path(path).stem if no
explicit name was given; update any call sites that construct Comparison from a
mapping to pass the provided label into _load_one.
Summary
Comparison.from_netcdf(paths, max_workers=None)classmethod — accepts a list of paths or a dict mapping paths → names, loads with a thread pool.Comparison.expand(max_workers=None)method — parallel expansion of clustered FlowSystems, passes through systems without clustering.Comparisoninstance. The file read infrom_netcdfis serialized under a module-level lock because the netCDF4 C library is not thread-safe; only the CPU-bound deserialization (JSON attrs +FlowSystem.from_dataset) actually runs in parallel.Benchmark results (why this may be closed)
Honest numbers — the parallelism doesn't pay off much:
Comparison.expand(6× clustered, 720h)Profile: ~70 % of per-file work is in
load_dataset_from_netcdf(GIL-held netCDF4 read, now under our lock) and only ~30 % inFlowSystem.from_dataset. Theoretical thread ceiling ≈ 1.4×, real ≈ 1.2–1.3×. Process pool startup + FlowSystem pickling dwarf any parallelism win at every size we tried.Test plan
tests/test_comparison.py::TestComparisonFromNetcdfandTestComparisonExpand(list/dict input, serial==parallel equivalence, mixed-cluster pass-through, full-timestep restoration).tests/test_comparison.pysuite passes (54 tests).🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests